// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements.  See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License.  You may obtain a copy of the License at
//
//    http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package beam

import (
	"context"
	"fmt"
	"reflect"

	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
)

func addParDoCtx(err error, s Scope) error {
	return errors.WithContextf(err, "inserting ParDo in scope %s", s)
}

// TryParDo attempts to insert a ParDo transform into the pipeline. It may fail
// for multiple reasons, notably that the dofn is not valid or cannot be bound
// -- due to type mismatch, say -- to the incoming PCollections.
func TryParDo(s Scope, dofn any, col PCollection, opts ...Option) ([]PCollection, error) {
	side, typedefs, err := validate(s, col, opts)
	if err != nil {
		return nil, addParDoCtx(err, s)
	}

	doFnOpt := graph.NumMainInputs(graph.MainSingle)
	// Check the PCollection for any keyed type (not just KV specifically).
	if typex.IsKV(col.Type()) {
		doFnOpt = graph.NumMainInputs(graph.MainKv)
	} else if typex.IsCoGBK(col.Type()) {
		doFnOpt = graph.CoGBKMainInput(len(col.Type().Components()))
	}
	fn, err := graph.NewDoFn(dofn, doFnOpt)
	if err != nil {
		return nil, addParDoCtx(err, s)
	}

	in := []*graph.Node{col.n}
	inWfn := col.n.WindowingStrategy().Fn
	for i, s := range side {
		sideNode := s.Input.n
		sideWfn := sideNode.WindowingStrategy().Fn
		if sideWfn.Kind == window.Sessions {
			return nil, fmt.Errorf("error with side input %d in DoFn %v: PCollections using merging WindowFns are not supported as side inputs. Consider re-windowing the side input PCollection before use", i, fn)
		}
		if (inWfn.Kind == window.GlobalWindows) && (sideWfn.Kind != window.GlobalWindows) {
			return nil, fmt.Errorf("main input is global windowed in DoFn %v but side input %v is not, cannot map windows correctly. Consider re-windowing the side input PCollection before use", fn, i)
		}
		if (sideWfn.Kind == window.GlobalWindows) && !sideNode.Bounded() {
			// TODO(https://github.com/apache/beam/issues/21596): Replace this warning with an error return when proper streaming test functions have been added.
			log.Warnf(context.Background(), "side input %v is global windowed in DoFn %v but is unbounded, DoFn will block until end of Global Window. Consider windowing your unbounded side input PCollection before use. This will cause your pipeline to fail in a future release, see https://github.com/apache/beam/issues/21596 for details", i, fn)
		}
		in = append(in, s.Input.n)
	}

	var rc *coder.Coder
	// Sdfs will always encode restrictions as KV<restriction, watermark state | bool(false)>
	if fn.IsSplittable() {
		sdf := (*graph.SplittableDoFn)(fn)
		restT := typex.New(sdf.RestrictionT())
		// If no watermark estimator state, use boolean as a placeholder
		weT := typex.New(reflect.TypeOf(true))
		if sdf.IsStatefulWatermarkEstimating() {
			weT = typex.New(sdf.WatermarkEstimatorStateT())
		}
		rc, err = inferCoder(typex.NewKV(restT, weT))
		if err != nil {
			return nil, addParDoCtx(err, s)
		}
	}

	edge, err := graph.NewParDo(s.real, s.scope, fn, in, rc, typedefs)
	if err != nil {
		return nil, addParDoCtx(err, s)
	}

	pipelineState := fn.PipelineState()
	if len(pipelineState) > 0 {
		edge.StateCoders = make(map[string]*coder.Coder)
		for _, ps := range pipelineState {
			if ct := ps.CoderType(); ct != nil {
				sT := typex.New(ps.CoderType())
				c, err := inferCoder(sT)
				if err != nil {
					return nil, addParDoCtx(err, s)
				}
				edge.StateCoders[graphx.UserStateCoderID(ps)] = c
			}
			if kct := ps.KeyCoderType(); kct != nil {
				kT := typex.New(kct)
				kc, err := inferCoder(kT)
				if err != nil {
					return nil, addParDoCtx(err, s)
				}
				edge.StateCoders[graphx.UserStateKeyCoderID(ps)] = kc
			}
		}
	}

	var ret []PCollection
	for _, out := range edge.Output {
		c := PCollection{out.To}
		c.SetCoder(NewCoder(c.Type()))
		ret = append(ret, c)
	}
	return ret, nil
}

// ParDoN inserts a ParDo with any number of outputs into the pipeline.
func ParDoN(s Scope, dofn any, col PCollection, opts ...Option) []PCollection {
	return MustN(TryParDo(s, dofn, col, opts...))
}

// ParDo0 inserts a ParDo with zero output transform into the pipeline.
func ParDo0(s Scope, dofn any, col PCollection, opts ...Option) {
	ret := MustN(TryParDo(s, dofn, col, opts...))
	if len(ret) != 0 {
		panic(formatParDoError(dofn, len(ret), 0))
	}
}

// ParDo is the core element-wise PTransform in Apache Beam, invoking a
// user-specified function on each of the elements of the input PCollection
// to produce zero or more output elements, all of which are collected into
// the output PCollection. Use one of the ParDo variants for a different
// number of output PCollections. The PCollections do not need to have the
// same types.
//
// Elements are processed independently, and possibly in parallel across
// distributed cloud resources. The ParDo processing style is similar to what
// happens inside the "Mapper" or "Reducer" class of a MapReduce-style
// algorithm.
//
// # DoFns
//
// The function to use to process each element is specified by a DoFn, either as
// single function or as a struct with methods, notably ProcessElement. The
// struct may also define Setup, StartBundle, FinishBundle and Teardown methods.
// The struct is JSON-serialized and may contain construction-time values.
//
// Functions and types used as DoFns must be registered with beam using the
// beam `register` package, so they may execute on distributed workers.
// Functions must not be anonymous or closures, or they will fail at execution time.
//
// Conceptually, when a ParDo transform is executed, the elements of the input
// PCollection are first divided up into some number of "bundles". These are
// farmed off to distributed worker machines (or locally on a local runner instance).
// For each bundle of input elements processing proceeds as follows:
//
//   - If a struct, a fresh instance of the argument DoFn is created on a
//     worker from json serialization, and the Setup method is called on this
//     instance, if present. A runner may reuse DoFn instances for multiple
//     bundles. A DoFn that has terminated abnormally (by returning an error)
//     will never be reused.
//   - The DoFn's StartBundle method, if provided, is called to initialize it.
//   - The DoFn's ProcessElement method is called on each of the input elements
//     in the bundle.
//   - The DoFn's FinishBundle method, if provided, is called to complete its
//     work. After FinishBundle is called, the framework will not again invoke
//     ProcessElement or FinishBundle until a new call to StartBundle has
//     occurred.
//   - If any of Setup, StartBundle, ProcessElement or FinishBundle methods
//     return an error, the Teardown method, if provided, will be called on the
//     DoFn instance.
//   - If a runner will no longer use a DoFn, the Teardown method, if provided,
//     will be called on the discarded instance.
//
// Each of the calls to any of the DoFn's processing methods can produce zero
// or more output elements. All of the of output elements from all of the DoFn
// instances are included in an output PCollection.
//
// For example:
//
//	func stringLen(word string) int { return len(word)	}
//	func init() { register.Function1x1(stringLen) }
//
//	words := beam.ParDo(s, &Foo{...}, ...)
//	lengths := beam.ParDo(s, stringLen, words)
//
// Each output element has the same timestamp and is in the same windows as its
// corresponding input element. The timestamp can be accessed and/or emitted by
// including a EventTime-typed parameter. The name of the function or struct is
// used as the DoFn name. Function literals do not have stable names and should
// thus not be used in production code.
//
// # Side Inputs
//
// While a ParDo processes elements from a single "main input" PCollection, it
// can take additional "side input" PCollections. These SideInput along with
// the DoFn parameter form express styles of accessing PCollection computed by
// earlier pipeline operations, passed in to the ParDo transform using SideInput
// options, and their contents accessible to each of the DoFn operations. For
// example:
//
//	func filterLessThanCutoff(word string, cutoff int, emit func(string)) {
//		if len(word) < cutoff {
//			emit(word)
//		}
//	}
//	func init() { register.Function3x0(filterLessThanCutoff) }
//
//	words := ...
//	cufoff := ...  // Singleton PCollection<int>
//	smallWords := beam.ParDo(s, filterLessThanCutoff, words, beam.SideInput{Input: cutoff})
//
// # Additional Outputs
//
// Optionally, a ParDo transform can produce zero or multiple output
// PCollections. Note the use of ParDo2 to specfic 2 outputs. For example:
//
//	func partitionAtCutoff(word string, cutoff int, small, big func(string)) {
//		if len(word) < cutoff {
//			small(word)
//		} else {
//			big(word)
//		}
//	}
//	func init() { register.Function4x0(partitionAtCutoff) }
//
//	words := ...
//	cufoff := ...  // Singleton PCollection<int>
//	small, big := beam.ParDo2(s, partitionAtCutoff, words, beam.SideInput{Input: cutoff})
//
// By default, the Coders for the elements of each output PCollections is
// inferred from the concrete type.
//
// # No Global Shared State
//
// There are three main ways to initialize the state of a DoFn instance
// processing a bundle:
//
//   - Define public instance variable state. This state will be automatically
//     JSON serialized and then deserialized in the DoFn instances created for
//     bundles. This method is good for state known when the original DoFn is
//     created in the main program, if it's not overly large. This is not
//     suitable for any state which must only be used for a single bundle, as
//     DoFn's may be used to process multiple bundles.
//
//   - Compute the state as a singleton PCollection and pass it in as a side
//     input to the DoFn. This is good if the state needs to be computed by the
//     pipeline, or if the state is very large and so is best read from file(s)
//     rather than sent as part of the DoFn's serialized state.
//
//   - Initialize the state in each DoFn instance, in a StartBundle method.
//     This is good if the initialization doesn't depend on any information
//     known only by the main program or computed by earlier pipeline
//     operations, but is the same for all instances of this DoFn for all
//     program executions, say setting up empty caches or initializing constant
//     data.
//
// ParDo operations are intended to be able to run in parallel across multiple
// worker machines. This precludes easy sharing and updating mutable state
// across those machines. There is no support in the Beam model for
// communicating and synchronizing updates to shared state across worker
// machines, so programs should not access any mutable global variable state in
// their DoFn, without understanding that the Go processes for the main program
// and workers will each have its own independent copy of such state, and there
// won't be any automatic copying of that state across Java processes. All
// information should be communicated to DoFn instances via main and side
// inputs and serialized state, and all output should be communicated from a
// DoFn instance via output PCollections, in the absence of external
// communication mechanisms written by user code.
//
// # Splittable DoFns
//
// Splittable DoFns are DoFns that are able to split work within an element,
// as opposed to only at element boundaries like normal DoFns. This is useful
// for DoFns that emit many outputs per input element and can distribute that
// work among multiple workers. The most common examples of this are sources.
//
// In order to split work within an element, splittable DoFns use the concept of
// restrictions, which are objects that are associated with an element and
// describe a portion of work on that element. For example, a restriction
// associated with a filename might describe what byte range within that file to
// process. In addition to restrictions, splittable DoFns also rely on
// restriction trackers to track progress and perform splits on a restriction
// currently being processed. See the `RTracker` interface in core/sdf/sdf.go
// for more details.
//
// # Splitting
//
// Splitting means taking one restriction and splitting into two or more that
// cover the entire input space of the original one. In other words, processing
// all the split restrictions should produce identical output to processing
// the original one.
//
// Splitting occurs in two stages. The initial splitting occurs before any
// restrictions have started processing. This step is used to split large
// restrictions into smaller ones that can then be distributed among multiple
// workers for processing. Initial splitting is user-defined and optional.
//
// Dynamic splitting occurs during the processing of a restriction in runners
// that have implemented it. If there are available workers, runners may split
// the unprocessed portion of work from a busy worker and shard it to available
// workers in order to better distribute work. With unsplittable DoFns this can
// only occur on element boundaries, but for splittable DoFns this split
// can land within a restriction and will require splitting that restriction.
//
//   - Note: Dataflow is currently the only runner with support for both
//     initial and dynamic splitting. Other runners do not support dynamic
//     splitting, and stragglers will therefore not be split during execution
//     with liquid sharding.
//
// # Splittable DoFn Methods
//
// Making a splittable DoFn requires the following methods to be implemented on
// a DoFn in addition to the usual DoFn requirements. In the following
// method signatures `elem` represents the main input elements to the DoFn, and
// should match the types used in ProcessElement. `restriction` represents the
// user-defined restriction, and can be any type as long as it is consistent
// throughout all the splittable DoFn methods:
//
//   - `CreateInitialRestriction(context.Context?, elem) (restriction, error?)`
//     CreateInitialRestriction creates an initial restriction encompassing an
//     entire element. The restriction created stays associated with the element
//     it describes.
//   - `SplitRestriction(context.Context?, elem, restriction) ([]restriction, error?)`
//     SplitRestriction takes an element and its initial restriction, and
//     optionally performs an initial split on it, returning a slice of all the
//     split restrictions. If no splits are desired, the method returns a slice
//     containing only the original restriction. This method will always be
//     called on each newly created restriction before they are processed.
//   - `RestrictionSize(context.Context?, elem, restriction) (float64, error?)`
//     RestrictionSize returns a cheap size estimation for a restriction. This
//     size is an abstract non-negative scalar value that represents how much
//     work a restriction takes compared to other restrictions in the same DoFn.
//     For example, a size of 200 represents twice as much work as a size of
//     100, but the numbers do not represent anything on their own. Size is
//     used by runners to estimate work for dynamic work rebalancing. Must be
//     thread safe. Will be invoked concurrently during bundle processing due to
//     runner initiated splitting and progress estimation.
//   - `CreateTracker(context.Context?, restriction) (restrictionTracker, error?)`
//     CreateTracker creates and returns a restriction tracker (a concrete type
//     implementing the `sdf.RTracker` interface) given a restriction. The
//     restriction tracker is used to track progress processing a restriction,
//     and to allow for dynamic splits. This method is called on each
//     restriction right before processing begins.
//   - `ProcessElement(context.Context?, sdf.RTracker, elem, func emit(output))
//     (sdf.ProcessContinuation?, error?)`
//     For splittable DoFns, ProcessElement requires a restriction tracker
//     before inputs, and generally requires emits to be used for outputs, since
//     restrictions will generally produce multiple outputs. For more details
//     on processing restrictions in a splittable DoFn, see `sdf.RTracker`.
//     ProcessElement can optionally return a `sdf.ProcessContinuation` to
//     signal to the runner that processing should be resumed at a later time,
//     if not all data within the restriction can be processed within the
//     lifetime of a single bundle. The runner tries to respect the resume time,
//     however it is not guaranteed.
//
// A splittable DoFn can also implement the following optional method:
//
//   - `TruncateRestriction(context.Context?, sdf.RTracker, elem) (restriction, error?)`
//     TruncateRestriction is triggered when a pipeline starts to drain on runners
//     that support pipeline draining. It helps finish the pipeline faster by
//     truncating the restriction. If not implemented, the default behavior for
//     bounded restrictions is to process the remainder of the restriction, and
//     for unbounded restrictions to process until the next SDF-initiated
//     checkpoint or runner-initiated split occurs.
//
// # Fault Tolerance
//
// In a distributed system, things can fail: machines can crash, machines can
// be unable to communicate across the network, etc. While individual failures
// are rare, the larger the job, the greater the chance that something,
// somewhere, will fail. Beam runners may strive to mask such failures by
// retrying failed DoFn bundles. This means that a DoFn instance might process
// a bundle partially, then crash for some reason, then be rerun (often as a
// new process) on that same bundle and on the same elements as before.
// Sometimes two or more DoFn instances will be running on the same bundle
// simultaneously, with the system taking the results of the first instance to
// complete successfully. Consequently, the code in a DoFn needs to be written
// such that these duplicate (sequential or concurrent) executions do not cause
// problems. If the outputs of a DoFn are a pure function of its inputs, then
// this requirement is satisfied. However, if a DoFn's execution has external
// side-effects, such as performing updates to external HTTP services, then
// the DoFn's code needs to take care to ensure that those updates are
// idempotent and that concurrent updates are acceptable. This property can be
// difficult to achieve, so it is advisable to strive to keep DoFns as pure
// functions as much as possible.
//
// # Optimization
//
// Beam runners may choose to apply optimizations to a pipeline before it is
// executed. A key optimization, fusion, relates to ParDo operations. If one
// ParDo operation produces a PCollection that is then consumed as the main
// input of another ParDo operation, the two ParDo operations will be fused
// together into a single ParDo operation and run in a single pass; this is
// "producer-consumer fusion". Similarly, if two or more ParDo operations
// have the same PCollection main input, they will be fused into a single ParDo
// that makes just one pass over the input PCollection; this is "sibling
// fusion".
//
// If after fusion there are no more unfused references to a PCollection (e.g.,
// one between a producer ParDo and a consumer ParDo), the PCollection itself
// is "fused away" and won't ever be written to disk, saving all the I/O and
// space expense of constructing it.
//
// When Beam runners apply fusion optimization, it is essentially "free" to
// write ParDo operations in a very modular, composable style, each ParDo
// operation doing one clear task, and stringing together sequences of ParDo
// operations to get the desired overall effect. Such programs can be easier to
// understand, easier to unit-test, easier to extend and evolve, and easier to
// reuse in new programs. The predefined library of PTransforms that come with
// Beam makes heavy use of this modular, composable style, trusting to the
// runner to "flatten out" all the compositions into highly optimized stages.
//
// See https://beam.apache.org/documentation/programming-guide/#pardo
// for the web documentation for ParDo
func ParDo(s Scope, dofn any, col PCollection, opts ...Option) PCollection {
	ret := MustN(TryParDo(s, dofn, col, opts...))
	if len(ret) != 1 {
		panic(formatParDoError(dofn, len(ret), 1))
	}
	return ret[0]
}

// TODO(herohde) 6/1/2017: add windowing aspects to above documentation.

// ParDo2 inserts a ParDo with 2 outputs into the pipeline.
func ParDo2(s Scope, dofn any, col PCollection, opts ...Option) (PCollection, PCollection) {
	ret := MustN(TryParDo(s, dofn, col, opts...))
	if len(ret) != 2 {
		panic(formatParDoError(dofn, len(ret), 2))
	}
	return ret[0], ret[1]
}

// ParDo3 inserts a ParDo with 3 outputs into the pipeline.
func ParDo3(s Scope, dofn any, col PCollection, opts ...Option) (PCollection, PCollection, PCollection) {
	ret := MustN(TryParDo(s, dofn, col, opts...))
	if len(ret) != 3 {
		panic(formatParDoError(dofn, len(ret), 3))
	}
	return ret[0], ret[1], ret[2]
}

// ParDo4 inserts a ParDo with 4 outputs into the pipeline.
func ParDo4(s Scope, dofn any, col PCollection, opts ...Option) (PCollection, PCollection, PCollection, PCollection) {
	ret := MustN(TryParDo(s, dofn, col, opts...))
	if len(ret) != 4 {
		panic(formatParDoError(dofn, len(ret), 4))
	}
	return ret[0], ret[1], ret[2], ret[3]
}

// ParDo5 inserts a ParDo with 5 outputs into the pipeline.
func ParDo5(s Scope, dofn any, col PCollection, opts ...Option) (PCollection, PCollection, PCollection, PCollection, PCollection) {
	ret := MustN(TryParDo(s, dofn, col, opts...))
	if len(ret) != 5 {
		panic(formatParDoError(dofn, len(ret), 5))
	}
	return ret[0], ret[1], ret[2], ret[3], ret[4]
}

// ParDo6 inserts a ParDo with 6 outputs into the pipeline.
func ParDo6(s Scope, dofn any, col PCollection, opts ...Option) (PCollection, PCollection, PCollection, PCollection, PCollection, PCollection) {
	ret := MustN(TryParDo(s, dofn, col, opts...))
	if len(ret) != 6 {
		panic(formatParDoError(dofn, len(ret), 6))
	}
	return ret[0], ret[1], ret[2], ret[3], ret[4], ret[5]
}

// ParDo7 inserts a ParDo with 7 outputs into the pipeline.
func ParDo7(s Scope, dofn any, col PCollection, opts ...Option) (PCollection, PCollection, PCollection, PCollection, PCollection, PCollection, PCollection) {
	ret := MustN(TryParDo(s, dofn, col, opts...))
	if len(ret) != 7 {
		panic(formatParDoError(dofn, len(ret), 7))
	}
	return ret[0], ret[1], ret[2], ret[3], ret[4], ret[5], ret[6]
}

// formatParDoError is a helper function to provide a more concise error
// message to the users when a DoFn and its ParDo pairing is incorrect.
//
// We construct a new graph.Fn using the doFn which is passed. We explicitly
// ignore the error since we already know that its already a DoFn type as
// TryParDo would have panicked otherwise.
func formatParDoError(doFn any, emitSize int, parDoSize int) string {
	doFun, _ := graph.NewFn(doFn)
	doFnName := doFun.Name()

	thisParDo := parDoForSize(parDoSize) // Conveniently keeps the API slim.
	correctParDo := parDoForSize(emitSize)

	return fmt.Sprintf("DoFn %v has %v outputs, but %v requires %v outputs, use %v instead.", doFnName, emitSize, thisParDo, parDoSize, correctParDo)
}

// parDoForSize takes a in a DoFns emit dimension and recommends the correct
// ParDo to use.
func parDoForSize(emitDim int) string {
	switch emitDim {
	case 0, 2, 3, 4, 5, 6, 7:
		return fmt.Sprintf("ParDo%d", emitDim)
	case 1:
		return "ParDo"
	default:
		return "ParDoN"
	}
}
